package com.zhiche.wms.service.mqtt;


import com.zhiche.wms.service.utils.MQTTUtil;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @Author: caiHua
 * @Description: 发布/订阅
 * @Date: Create in 11:49 2019/8/27
 */
public class PublishSubscribe {
    private static final Logger LOGGER = LoggerFactory.getLogger(PublishSubscribe.class);

    private static MqttClientPersistence persistence = new MemoryPersistence();

    /**
     * 消息发布
     *
     * @param pushJson
     */
    public static void publish (String pushJson) {
        try {
            LOGGER.info("clientID====" + MQTTUtil.clientID);
            MqttClient client = new MqttClient(MQTTUtil.serviceURI, MQTTUtil.clientID, persistence);
            MqttConnectOptions connectOptions = new MqttConnectOptions();
            connectOptions.setUserName(MQTTUtil.username);
            connectOptions.setPassword(MQTTUtil.password.toCharArray());
            //设置是否清空session,false表示服务器会保留客户端的连接记录，true表示每次连接到服务器都以新的身份连接
            connectOptions.setCleanSession(false);
            //设置超时时间 单位为秒
            connectOptions.setConnectionTimeout(60);
            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线，但这个方法并没有重连的机制
            connectOptions.setKeepAliveInterval(20);
            // 设置回调
            client.setCallback(new PushCallback());
            //发布者连接服务
            client.connect(connectOptions);
            LOGGER.info("发布者连接状态： " + client.isConnected());

            MqttTopic mqttTopic = client.getTopic(MQTTUtil.topic);
            LOGGER.info("mqttTopic ============" + mqttTopic);

            //设置消息内容
            MqttMessage mqttMessage = new MqttMessage();
            mqttMessage.setQos(MQTTUtil.qos);
            mqttMessage.setRetained(true);
            mqttMessage.setPayload(pushJson.getBytes());

            MqttDeliveryToken deliveryToken = mqttTopic.publish(mqttMessage);
            deliveryToken.waitForCompletion();
            //如果消息发布成功deliveryToken.getMessage()返回的为null
            LOGGER.info(String.valueOf(deliveryToken.getMessage()));
            if (!deliveryToken.isComplete()) {
                LOGGER.info("发布者发布消息： " + MQTTUtil.clientID + " 失败");
            } else {
                LOGGER.info("发布者发布消息： " + MQTTUtil.clientID + " 成功");
            }
            client.disconnect();
        } catch (Exception e) {
            LOGGER.error("发布失败 {}", e.getMessage());
        }
    }

    /**
     * 消息订阅
     **/
    public static void subscribe () {
        try {
            MqttClient client = new MqttClient(MQTTUtil.serviceURI, MQTTUtil.clientID, persistence);
            client.setCallback(new MqttCallback() {
                @Override
                public void connectionLost (Throwable cause) {
                    LOGGER.info("订阅者连接丢失...");
                    LOGGER.info(cause.getMessage());
                }

                @Override
                public void messageArrived (String topic, MqttMessage message) {
                    LOGGER.info("订阅者接收到消息： " + message.toString());
                }

                @Override
                public void deliveryComplete (IMqttDeliveryToken token) {
                }
            });
            MqttConnectOptions connectOptions = new MqttConnectOptions();
            connectOptions.setUserName(MQTTUtil.username);
            connectOptions.setPassword(MQTTUtil.password.toCharArray());
            connectOptions.setCleanSession(false);
            //订阅者连接订阅主题
            client.connect(connectOptions);
            client.subscribe(MQTTUtil.topic, MQTTUtil.qos);
            LOGGER.info("订阅者连接状态： " + client.isConnected());
            client.disconnect();
        } catch (MqttException e) {
            LOGGER.error("发布成功 {}", e.getMessage());
        }

    }
}
